package com.nx.run.shopify.config.mq;

import com.google.gson.Gson;
import com.nx.adaper.shopify.mq.listener.ShopifyMqMessageListener;
import com.nx.adaper.shopify.support.AliMqProps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.Collections;

@Slf4j
@Configuration
public class ConsumeConfig {

    private static final Gson GSON = new Gson();

    @Resource
    private AliMqProps aliMqProps;

    @Resource
    private ShopifyMqMessageListener shopifyMqMessageListener;

    @Bean(destroyMethod = "close", name = "MqConsumer")
    public PushConsumer buildConsumer() throws ClientException {
        /**
         * 实例接入点，从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问，建议填写VPC接入点。
         * 如果是在本地公网访问，或者是线下IDC环境访问，可以使用公网接入点。使用公网接入点访问，必须开启实例的公网访问功能。
         */
        String endpoints = aliMqProps.getEndpoints();
        //指定需要订阅哪个目标Topic，Topic需要提前在控制台创建，如果不创建直接使用会返回报错。
        String topic = aliMqProps.getTopic();
        //为消费者指定所属的消费者分组，Group需要提前在控制台创建，如果不创建直接使用会返回报错。
        String consumerGroup = aliMqProps.getConsumerGroup();
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()
                .setNamespace(aliMqProps.getNamespace())
                .setEndpoints(endpoints);
        /**
         * 如果是使用公网接入点访问，configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 如果是在阿里云ECS内网访问，无需填写该配置，服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例，公网访问必须设置实例的用户名密码，当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider(aliMqProps.getAccessKey(), aliMqProps.getAccessSecret()));
        ClientConfiguration clientConfiguration = builder.build();
        //订阅消息的过滤规则，表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化PushConsumer，需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //设置消费监听器。
                .setMessageListener(messageView -> {
                    log.info("[MQ消费者]收到消息，消息ID：{}", messageView.getMessageId());
                    try {
                        ConsumeResult consume = shopifyMqMessageListener.consume(messageView);
                        log.info("[MQ消费者]处理完成，消息ID：{}，处理结果：{}", messageView.getMessageId(), consume);
                        return consume;
                    } catch (Exception e) {
                        log.error("[MQ消费者]处理异常，消息ID：{}，异常信息：{}", messageView.getMessageId(), e.getMessage());
                        log.error("异常堆栈:",e);
                        return ConsumeResult.FAILURE;
                    }
                })
                .build();
        return pushConsumer;
    }

}
